// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <aries/parquet/metadata.h>

#include <algorithm>
#include <cinttypes>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include <nebula/io/memory.h>
#include <nebula/util/key_value_metadata.h>
#include <turbo/log/logging.h>
#include <aries/parquet/encryption/encryption_internal.h>
#include <aries/parquet/encryption/internal_file_decryptor.h>
#include <aries/parquet/exception.h>
#include <aries/parquet/schema.h>
#include "aries/parquet/schema_internal.h"
#include <aries/parquet/thrift_internal.h>

namespace parquet {
    const ApplicationVersion &ApplicationVersion::PARQUET_251_FIXED_VERSION() {
        static ApplicationVersion version("parquet-mr", 1, 8, 0);
        return version;
    }

    const ApplicationVersion &ApplicationVersion::PARQUET_816_FIXED_VERSION() {
        static ApplicationVersion version("parquet-mr", 1, 2, 9);
        return version;
    }

    const ApplicationVersion &ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() {
        static ApplicationVersion version("parquet-cpp", 1, 3, 0);
        return version;
    }

    const ApplicationVersion &ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() {
        static ApplicationVersion version("parquet-mr", 1, 10, 0);
        return version;
    }

    const ApplicationVersion &ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() {
        // parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 pages
        // with is_compressed==0 but still write compressed data. (See: ARROW-10353).
        // Parquet 1.5.1 had this problem, and after that we switched to the
        // application name "parquet-cpp-arrow", so this version is fake.
        static ApplicationVersion version("parquet-cpp", 2, 0, 0);
        return version;
    }

    std::string ParquetVersionToString(ParquetVersion::type ver) {
        switch (ver) {
            case ParquetVersion::PARQUET_1_0:
                return "1.0";
            case ParquetVersion::PARQUET_2_0:
                return "pseudo-2.0";
            case ParquetVersion::PARQUET_2_4:
                return "2.4";
            case ParquetVersion::PARQUET_2_6:
                return "2.6";
        }

        // This should be unreachable
        return "UNKNOWN";
    }

    template<typename DType>
    static std::shared_ptr<Statistics> MakeTypedColumnStats(
        const format::ColumnMetaData &metadata, const ColumnDescriptor *descr) {
        // If ColumnOrder is defined, return max_value and min_value
        if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
            return MakeStatistics<DType>(
                descr, metadata.statistics.min_value, metadata.statistics.max_value,
                metadata.num_values - metadata.statistics.null_count,
                metadata.statistics.null_count, metadata.statistics.distinct_count,
                metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value,
                metadata.statistics.__isset.null_count,
                metadata.statistics.__isset.distinct_count);
        }
        // Default behavior
        return MakeStatistics<DType>(
            descr, metadata.statistics.min, metadata.statistics.max,
            metadata.num_values - metadata.statistics.null_count,
            metadata.statistics.null_count, metadata.statistics.distinct_count,
            metadata.statistics.__isset.max || metadata.statistics.__isset.min,
            metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count);
    }

    std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData &meta_data,
                                                const ColumnDescriptor *descr) {
        switch (static_cast<Type::type>(meta_data.type)) {
            case Type::BOOLEAN:
                return MakeTypedColumnStats<BooleanType>(meta_data, descr);
            case Type::INT32:
                return MakeTypedColumnStats<Int32Type>(meta_data, descr);
            case Type::INT64:
                return MakeTypedColumnStats<Int64Type>(meta_data, descr);
            case Type::INT96:
                return MakeTypedColumnStats<Int96Type>(meta_data, descr);
            case Type::DOUBLE:
                return MakeTypedColumnStats<DoubleType>(meta_data, descr);
            case Type::FLOAT:
                return MakeTypedColumnStats<FloatType>(meta_data, descr);
            case Type::BYTE_ARRAY:
                return MakeTypedColumnStats<ByteArrayType>(meta_data, descr);
            case Type::FIXED_LEN_BYTE_ARRAY:
                return MakeTypedColumnStats<FLBAType>(meta_data, descr);
            case Type::UNDEFINED:
                break;
        }
        throw ParquetException("Can't decode page statistics for selected column type");
    }

    // MetaData Accessor

    // ColumnCryptoMetaData
    class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl {
    public:
        explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData *crypto_metadata)
            : crypto_metadata_(crypto_metadata) {
        }

        bool encrypted_with_footer_key() const {
            return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY;
        }

        bool encrypted_with_column_key() const {
            return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY;
        }

        std::shared_ptr<schema::ColumnPath> path_in_schema() const {
            return std::make_shared<schema::ColumnPath>(
                crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema);
        }

        const std::string &key_metadata() const {
            return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.key_metadata;
        }

    private:
        const format::ColumnCryptoMetaData *crypto_metadata_;
    };

    std::unique_ptr<ColumnCryptoMetaData> ColumnCryptoMetaData::Make(
        const uint8_t *metadata) {
        return std::unique_ptr<ColumnCryptoMetaData>(new ColumnCryptoMetaData(metadata));
    }

    ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t *metadata)
        : impl_(std::make_unique<ColumnCryptoMetaDataImpl>(
            reinterpret_cast<const format::ColumnCryptoMetaData *>(metadata))) {
    }

    ColumnCryptoMetaData::~ColumnCryptoMetaData() = default;

    std::shared_ptr<schema::ColumnPath> ColumnCryptoMetaData::path_in_schema() const {
        return impl_->path_in_schema();
    }

    bool ColumnCryptoMetaData::encrypted_with_footer_key() const {
        return impl_->encrypted_with_footer_key();
    }

    const std::string &ColumnCryptoMetaData::key_metadata() const {
        return impl_->key_metadata();
    }

    // ColumnChunk metadata
    class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
    public:
        explicit ColumnChunkMetaDataImpl(const format::ColumnChunk *column,
                                         const ColumnDescriptor *descr,
                                         int16_t row_group_ordinal, int16_t column_ordinal,
                                         const ReaderProperties &properties,
                                         const ApplicationVersion *writer_version,
                                         std::shared_ptr<InternalFileDecryptor> file_decryptor)
            : column_(column),
              descr_(descr),
              properties_(properties),
              writer_version_(writer_version) {
            column_metadata_ = &column->meta_data;
            if (column->__isset.crypto_metadata) {
                // column metadata is encrypted
                format::ColumnCryptoMetaData ccmd = column->crypto_metadata;

                if (ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY) {
                    if (file_decryptor != nullptr && file_decryptor->properties() != nullptr) {
                        // should decrypt metadata
                        std::shared_ptr<schema::ColumnPath> path = std::make_shared<schema::ColumnPath>(
                            ccmd.ENCRYPTION_WITH_COLUMN_KEY.path_in_schema);
                        std::string key_metadata = ccmd.ENCRYPTION_WITH_COLUMN_KEY.key_metadata;

                        std::string aad_column_metadata = encryption::CreateModuleAad(
                            file_decryptor->file_aad(), encryption::kColumnMetaData, row_group_ordinal,
                            column_ordinal, static_cast<int16_t>(-1));
                        auto decryptor = file_decryptor->GetColumnMetaDecryptor(
                            path->ToDotString(), key_metadata, aad_column_metadata);
                        auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size());
                        ThriftDeserializer deserializer(properties_);
                        deserializer.DeserializeMessage(
                            reinterpret_cast<const uint8_t *>(column->encrypted_column_metadata.c_str()),
                            &len, &decrypted_metadata_, decryptor.get());
                        column_metadata_ = &decrypted_metadata_;
                    } else {
                        throw ParquetException(
                            "Cannot decrypt ColumnMetadata."
                            " FileDecryption is not setup correctly");
                    }
                }
            }
            for (const auto &encoding: column_metadata_->encodings) {
                encodings_.push_back(LoadEnumSafe(&encoding));
            }
            for (const auto &encoding_stats: column_metadata_->encoding_stats) {
                encoding_stats_.push_back({
                    LoadEnumSafe(&encoding_stats.page_type),
                    LoadEnumSafe(&encoding_stats.encoding),
                    encoding_stats.count
                });
            }
            possible_stats_ = nullptr;
        }

        bool Equals(const ColumnChunkMetaDataImpl &other) const {
            return *column_metadata_ == *other.column_metadata_;
        }

        // column chunk
        inline int64_t file_offset() const { return column_->file_offset; }
        inline const std::string &file_path() const { return column_->file_path; }

        inline Type::type type() const { return LoadEnumSafe(&column_metadata_->type); }

        inline int64_t num_values() const { return column_metadata_->num_values; }

        std::shared_ptr<schema::ColumnPath> path_in_schema() {
            return std::make_shared<schema::ColumnPath>(column_metadata_->path_in_schema);
        }

        // Check if statistics are set and are valid
        // 1) Must be set in the metadata
        // 2) Statistics must not be corrupted
        inline bool is_stats_set() const {
            DKCHECK(writer_version_ != nullptr);
            // If the column statistics don't exist or column sort order is unknown
            // we cannot use the column stats
            if (!column_metadata_->__isset.statistics ||
                descr_->sort_order() == SortOrder::UNKNOWN) {
                return false;
            }
            if (possible_stats_ == nullptr) {
                possible_stats_ = MakeColumnStats(*column_metadata_, descr_);
            }
            EncodedStatistics encodedStatistics = possible_stats_->Encode();
            return writer_version_->HasCorrectStatistics(type(), encodedStatistics,
                                                         descr_->sort_order());
        }

        inline std::shared_ptr<Statistics> statistics() const {
            return is_stats_set() ? possible_stats_ : nullptr;
        }

        inline CompressionType compression() const {
            return LoadEnumSafe(&column_metadata_->codec);
        }

        const std::vector<Encoding::type> &encodings() const { return encodings_; }

        const std::vector<PageEncodingStats> &encoding_stats() const { return encoding_stats_; }

        inline std::optional<int64_t> bloom_filter_offset() const {
            if (column_metadata_->__isset.bloom_filter_offset) {
                return column_metadata_->bloom_filter_offset;
            }
            return std::nullopt;
        }

        inline std::optional<int64_t> bloom_filter_length() const {
            if (column_metadata_->__isset.bloom_filter_length) {
                return column_metadata_->bloom_filter_length;
            }
            return std::nullopt;
        }

        inline bool has_dictionary_page() const {
            return column_metadata_->__isset.dictionary_page_offset;
        }

        inline int64_t dictionary_page_offset() const {
            return column_metadata_->dictionary_page_offset;
        }

        inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; }

        inline bool has_index_page() const {
            return column_metadata_->__isset.index_page_offset;
        }

        inline int64_t index_page_offset() const { return column_metadata_->index_page_offset; }

        inline int64_t total_compressed_size() const {
            return column_metadata_->total_compressed_size;
        }

        inline int64_t total_uncompressed_size() const {
            return column_metadata_->total_uncompressed_size;
        }

        inline std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const {
            if (column_->__isset.crypto_metadata) {
                return ColumnCryptoMetaData::Make(
                    reinterpret_cast<const uint8_t *>(&column_->crypto_metadata));
            } else {
                return nullptr;
            }
        }

        std::optional<IndexLocation> GetColumnIndexLocation() const {
            if (column_->__isset.column_index_offset && column_->__isset.column_index_length) {
                return IndexLocation{column_->column_index_offset, column_->column_index_length};
            }
            return std::nullopt;
        }

        std::optional<IndexLocation> GetOffsetIndexLocation() const {
            if (column_->__isset.offset_index_offset && column_->__isset.offset_index_length) {
                return IndexLocation{column_->offset_index_offset, column_->offset_index_length};
            }
            return std::nullopt;
        }

    private:
        mutable std::shared_ptr<Statistics> possible_stats_;
        std::vector<Encoding::type> encodings_;
        std::vector<PageEncodingStats> encoding_stats_;
        const format::ColumnChunk *column_;
        const format::ColumnMetaData *column_metadata_;
        format::ColumnMetaData decrypted_metadata_;
        const ColumnDescriptor *descr_;
        const ReaderProperties properties_;
        const ApplicationVersion *writer_version_;
    };

    std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
        const void *metadata, const ColumnDescriptor *descr,
        const ReaderProperties &properties, const ApplicationVersion *writer_version,
        int16_t row_group_ordinal, int16_t column_ordinal,
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        return std::unique_ptr<ColumnChunkMetaData>(
            new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal,
                                    properties, writer_version, std::move(file_decryptor)));
    }

    std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
        const void *metadata, const ColumnDescriptor *descr,
        const ApplicationVersion *writer_version, int16_t row_group_ordinal,
        int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData(
            metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(),
            writer_version, std::move(file_decryptor)));
    }

    ColumnChunkMetaData::ColumnChunkMetaData(
        const void *metadata, const ColumnDescriptor *descr, int16_t row_group_ordinal,
        int16_t column_ordinal, const ReaderProperties &properties,
        const ApplicationVersion *writer_version,
        std::shared_ptr<InternalFileDecryptor> file_decryptor)
        : impl_{
            new ColumnChunkMetaDataImpl(
                reinterpret_cast<const format::ColumnChunk *>(metadata), descr,
                row_group_ordinal, column_ordinal, properties, writer_version,
                std::move(file_decryptor))
        } {
    }

    ColumnChunkMetaData::~ColumnChunkMetaData() = default;

    // column chunk
    int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); }

    const std::string &ColumnChunkMetaData::file_path() const { return impl_->file_path(); }

    Type::type ColumnChunkMetaData::type() const { return impl_->type(); }

    int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); }

    std::shared_ptr<schema::ColumnPath> ColumnChunkMetaData::path_in_schema() const {
        return impl_->path_in_schema();
    }

    std::shared_ptr<Statistics> ColumnChunkMetaData::statistics() const {
        return impl_->statistics();
    }

    bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); }

    std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
        return impl_->bloom_filter_offset();
    }

    std::optional<int64_t> ColumnChunkMetaData::bloom_filter_length() const {
        return impl_->bloom_filter_length();
    }

    bool ColumnChunkMetaData::has_dictionary_page() const {
        return impl_->has_dictionary_page();
    }

    int64_t ColumnChunkMetaData::dictionary_page_offset() const {
        return impl_->dictionary_page_offset();
    }

    int64_t ColumnChunkMetaData::data_page_offset() const {
        return impl_->data_page_offset();
    }

    bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); }

    int64_t ColumnChunkMetaData::index_page_offset() const {
        return impl_->index_page_offset();
    }

    CompressionType ColumnChunkMetaData::compression() const {
        return impl_->compression();
    }

    bool ColumnChunkMetaData::can_decompress() const {
        return ::nebula::Codec::IsAvailable(compression());
    }

    const std::vector<Encoding::type> &ColumnChunkMetaData::encodings() const {
        return impl_->encodings();
    }

    const std::vector<PageEncodingStats> &ColumnChunkMetaData::encoding_stats() const {
        return impl_->encoding_stats();
    }

    int64_t ColumnChunkMetaData::total_uncompressed_size() const {
        return impl_->total_uncompressed_size();
    }

    int64_t ColumnChunkMetaData::total_compressed_size() const {
        return impl_->total_compressed_size();
    }

    std::unique_ptr<ColumnCryptoMetaData> ColumnChunkMetaData::crypto_metadata() const {
        return impl_->crypto_metadata();
    }

    std::optional<IndexLocation> ColumnChunkMetaData::GetColumnIndexLocation() const {
        return impl_->GetColumnIndexLocation();
    }

    std::optional<IndexLocation> ColumnChunkMetaData::GetOffsetIndexLocation() const {
        return impl_->GetOffsetIndexLocation();
    }

    bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData &other) const {
        return impl_->Equals(*other.impl_);
    }

    // row-group metadata
    class RowGroupMetaData::RowGroupMetaDataImpl {
    public:
        explicit RowGroupMetaDataImpl(const format::RowGroup *row_group,
                                      const SchemaDescriptor *schema,
                                      const ReaderProperties &properties,
                                      const ApplicationVersion *writer_version,
                                      std::shared_ptr<InternalFileDecryptor> file_decryptor)
            : row_group_(row_group),
              schema_(schema),
              properties_(properties),
              writer_version_(writer_version),
              file_decryptor_(std::move(file_decryptor)) {
            if (TURBO_UNLIKELY(row_group_->columns.size() >
                static_cast<size_t>(std::numeric_limits<int>::max()))) {
                throw ParquetException("Row group had too many columns: ",
                                       row_group_->columns.size());
            }
        }

        bool Equals(const RowGroupMetaDataImpl &other) const {
            return *row_group_ == *other.row_group_;
        }

        inline int num_columns() const { return static_cast<int>(row_group_->columns.size()); }

        inline int64_t num_rows() const { return row_group_->num_rows; }

        inline int64_t total_byte_size() const { return row_group_->total_byte_size; }

        inline int64_t total_compressed_size() const {
            return row_group_->total_compressed_size;
        }

        inline int64_t file_offset() const { return row_group_->file_offset; }

        inline const SchemaDescriptor *schema() const { return schema_; }

        std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
            if (i >= 0 && i < num_columns()) {
                return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
                                                 properties_, writer_version_, row_group_->ordinal,
                                                 i, file_decryptor_);
            }
            throw ParquetException("The file only has ", num_columns(),
                                   " columns, requested metadata for column: ", i);
        }

        std::vector<SortingColumn> sorting_columns() const {
            std::vector<SortingColumn> sorting_columns;
            if (!row_group_->__isset.sorting_columns) {
                return sorting_columns;
            }
            sorting_columns.resize(row_group_->sorting_columns.size());
            for (size_t i = 0; i < sorting_columns.size(); ++i) {
                sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
            }
            return sorting_columns;
        }

    private:
        const format::RowGroup *row_group_;
        const SchemaDescriptor *schema_;
        const ReaderProperties properties_;
        const ApplicationVersion *writer_version_;
        std::shared_ptr<InternalFileDecryptor> file_decryptor_;
    };

    std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
        const void *metadata, const SchemaDescriptor *schema,
        const ApplicationVersion *writer_version,
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        return std::unique_ptr<parquet::RowGroupMetaData>(
            new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version,
                                 std::move(file_decryptor)));
    }

    std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
        const void *metadata, const SchemaDescriptor *schema,
        const ReaderProperties &properties, const ApplicationVersion *writer_version,
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData(
            metadata, schema, properties, writer_version, std::move(file_decryptor)));
    }

    RowGroupMetaData::RowGroupMetaData(const void *metadata, const SchemaDescriptor *schema,
                                       const ReaderProperties &properties,
                                       const ApplicationVersion *writer_version,
                                       std::shared_ptr<InternalFileDecryptor> file_decryptor)
        : impl_{
            new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup *>(metadata),
                                     schema, properties, writer_version,
                                     std::move(file_decryptor))
        } {
    }

    RowGroupMetaData::~RowGroupMetaData() = default;

    bool RowGroupMetaData::Equals(const RowGroupMetaData &other) const {
        return impl_->Equals(*other.impl_);
    }

    int RowGroupMetaData::num_columns() const { return impl_->num_columns(); }

    int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); }

    int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); }

    int64_t RowGroupMetaData::total_compressed_size() const {
        return impl_->total_compressed_size();
    }

    int64_t RowGroupMetaData::file_offset() const { return impl_->file_offset(); }

    const SchemaDescriptor *RowGroupMetaData::schema() const { return impl_->schema(); }

    std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int i) const {
        return impl_->ColumnChunk(i);
    }

    bool RowGroupMetaData::can_decompress() const {
        int n_columns = num_columns();
        for (int i = 0; i < n_columns; i++) {
            if (!ColumnChunk(i)->can_decompress()) {
                return false;
            }
        }
        return true;
    }

    std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
        return impl_->sorting_columns();
    }

    // file metadata
    class FileMetaData::FileMetaDataImpl {
    public:
        FileMetaDataImpl() = default;

        explicit FileMetaDataImpl(
            const void *metadata, uint32_t *metadata_len, ReaderProperties properties,
            std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
            : properties_(std::move(properties)), file_decryptor_(std::move(file_decryptor)) {
            metadata_ = std::make_unique<format::FileMetaData>();

            auto footer_decryptor =
                    file_decryptor_ != nullptr ? file_decryptor_->GetFooterDecryptor() : nullptr;

            ThriftDeserializer deserializer(properties_);
            deserializer.DeserializeMessage(reinterpret_cast<const uint8_t *>(metadata),
                                            metadata_len, metadata_.get(),
                                            footer_decryptor.get());
            metadata_len_ = *metadata_len;

            if (metadata_->__isset.created_by) {
                writer_version_ = ApplicationVersion(metadata_->created_by);
            } else {
                writer_version_ = ApplicationVersion("unknown 0.0.0");
            }

            InitSchema();
            InitColumnOrders();
            InitKeyValueMetadata();
        }

        bool VerifySignature(const void *signature) {
            // verify decryption properties are set
            if (file_decryptor_ == nullptr) {
                throw ParquetException("Decryption not set properly. cannot verify signature");
            }
            // serialize the footer
            uint8_t *serialized_data;
            uint32_t serialized_len = metadata_len_;
            ThriftSerializer serializer;
            serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data);

            // encrypt with nonce
            auto nonce = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(signature));
            auto tag = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(signature)) +
                       encryption::kNonceLength;

            std::string key = file_decryptor_->GetFooterKey();
            std::string aad = encryption::CreateFooterAad(file_decryptor_->file_aad());

            auto aes_encryptor = encryption::AesEncryptor::Make(
                file_decryptor_->algorithm(), static_cast<int>(key.size()), true,
                false /*write_length*/, nullptr);

            std::shared_ptr<Buffer> encrypted_buffer = std::static_pointer_cast<ResizableBuffer>(
                allocate_buffer(file_decryptor_->pool(),
                                aes_encryptor->CiphertextSizeDelta() + serialized_len));
            uint32_t encrypted_len = aes_encryptor->SignedFooterEncrypt(
                serialized_data, serialized_len, str2bytes(key), static_cast<int>(key.size()),
                str2bytes(aad), static_cast<int>(aad.size()), nonce,
                encrypted_buffer->mutable_data());
            // Delete AES encryptor object. It was created only to verify the footer signature.
            aes_encryptor->WipeOut();
            delete aes_encryptor;
            return 0 ==
                   memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength,
                          tag, encryption::kGcmTagLength);
        }

        inline uint32_t size() const { return metadata_len_; }
        inline int num_columns() const { return schema_.num_columns(); }
        inline int64_t num_rows() const { return metadata_->num_rows; }

        inline int num_row_groups() const {
            return static_cast<int>(metadata_->row_groups.size());
        }

        inline int32_t version() const { return metadata_->version; }
        inline const std::string &created_by() const { return metadata_->created_by; }

        inline int num_schema_elements() const {
            return static_cast<int>(metadata_->schema.size());
        }

        inline bool is_encryption_algorithm_set() const {
            return metadata_->__isset.encryption_algorithm;
        }

        inline EncryptionAlgorithm encryption_algorithm() {
            return FromThrift(metadata_->encryption_algorithm);
        }

        inline const std::string &footer_signing_key_metadata() {
            return metadata_->footer_signing_key_metadata;
        }

        const ApplicationVersion &writer_version() const { return writer_version_; }

        void WriteTo(::nebula::io::OutputStream *dst,
                     const std::shared_ptr<Encryptor> &encryptor) const {
            ThriftSerializer serializer;
            // Only in encrypted files with plaintext footers the
            // encryption_algorithm is set in footer
            if (is_encryption_algorithm_set()) {
                uint8_t *serialized_data;
                uint32_t serialized_len;
                serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data);

                // encrypt the footer key
                std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() +
                                                    serialized_len);
                unsigned encrypted_len =
                        encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data());

                // write unencrypted footer
                PARQUET_THROW_NOT_OK(dst->write(serialized_data, serialized_len));
                // Write signature (nonce and tag)
                PARQUET_THROW_NOT_OK(
                    dst->write(encrypted_data.data() + 4, encryption::kNonceLength));
                PARQUET_THROW_NOT_OK(
                    dst->write(encrypted_data.data() + encrypted_len - encryption::kGcmTagLength,
                        encryption::kGcmTagLength));
            } else {
                // either plaintext file (when encryptor is null)
                // or encrypted file with encrypted footer
                serializer.Serialize(metadata_.get(), dst, encryptor.get());
            }
        }

        std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
            if (!(i >= 0 && i < num_row_groups())) {
                std::stringstream ss;
                ss << "The file only has " << num_row_groups()
                        << " row groups, requested metadata for row group: " << i;
                throw ParquetException(ss.str());
            }
            return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
                                          &writer_version_, file_decryptor_);
        }

        bool Equals(const FileMetaDataImpl &other) const {
            return *metadata_ == *other.metadata_;
        }

        const SchemaDescriptor *schema() const { return &schema_; }

        const std::shared_ptr<const KeyValueMetadata> &key_value_metadata() const {
            return key_value_metadata_;
        }

        void set_file_path(const std::string &path) {
            for (format::RowGroup &row_group: metadata_->row_groups) {
                for (format::ColumnChunk &chunk: row_group.columns) {
                    chunk.__set_file_path(path);
                }
            }
        }

        format::RowGroup &row_group(int i) {
            if (!(i >= 0 && i < num_row_groups())) {
                std::stringstream ss;
                ss << "The file only has " << num_row_groups()
                        << " row groups, requested metadata for row group: " << i;
                throw ParquetException(ss.str());
            }
            return metadata_->row_groups[i];
        }

        void AppendRowGroups(FileMetaDataImpl *other) {
            std::ostringstream diff_output;
            if (!schema()->Equals(*other->schema(), &diff_output)) {
                auto msg = "AppendRowGroups requires equal schemas.\n" + diff_output.str();
                throw ParquetException(msg);
            }

            // ARROW-13654: `other` may point to self, be careful not to enter an infinite loop
            const int n = other->num_row_groups();
            // ARROW-16613: do not use reserve() as that may suppress overallocation
            // and incur O(n²) behavior on repeated calls to AppendRowGroups().
            // (see https://en.cppreference.com/w/cpp/container/vector/reserve
            //  about inappropriate uses of reserve()).
            const auto start = metadata_->row_groups.size();
            metadata_->row_groups.resize(start + n);
            for (int i = 0; i < n; i++) {
                metadata_->row_groups[start + i] = other->row_group(i);
                metadata_->num_rows += metadata_->row_groups[start + i].num_rows;
            }
        }

        std::shared_ptr<FileMetaData> Subset(const std::vector<int> &row_groups) {
            for (int i: row_groups) {
                if (i < num_row_groups()) continue;

                throw ParquetException(
                    "The file only has ", num_row_groups(),
                    " row groups, but requested a subset including row group: ", i);
            }

            std::shared_ptr<FileMetaData> out(new FileMetaData());
            out->impl_ = std::make_unique<FileMetaDataImpl>();
            out->impl_->metadata_ = std::make_unique<format::FileMetaData>();

            auto metadata = out->impl_->metadata_.get();
            metadata->version = metadata_->version;
            metadata->schema = metadata_->schema;

            metadata->row_groups.resize(row_groups.size());

            int i = 0;
            for (int selected_index: row_groups) {
                metadata->num_rows += row_group(selected_index).num_rows;
                metadata->row_groups[i++] = row_group(selected_index);
            }

            metadata->key_value_metadata = metadata_->key_value_metadata;
            metadata->created_by = metadata_->created_by;
            metadata->column_orders = metadata_->column_orders;
            metadata->encryption_algorithm = metadata_->encryption_algorithm;
            metadata->footer_signing_key_metadata = metadata_->footer_signing_key_metadata;
            metadata->__isset = metadata_->__isset;

            out->impl_->schema_ = schema_;
            out->impl_->writer_version_ = writer_version_;
            out->impl_->key_value_metadata_ = key_value_metadata_;
            out->impl_->file_decryptor_ = file_decryptor_;

            return out;
        }

        void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor) {
            file_decryptor_ = std::move(file_decryptor);
        }

        const std::shared_ptr<InternalFileDecryptor> &file_decryptor() const {
            return file_decryptor_;
        }

    private:
        friend FileMetaDataBuilder;
        uint32_t metadata_len_ = 0;
        std::unique_ptr<format::FileMetaData> metadata_;
        SchemaDescriptor schema_;
        ApplicationVersion writer_version_;
        std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
        const ReaderProperties properties_;
        std::shared_ptr<InternalFileDecryptor> file_decryptor_;

        void InitSchema() {
            if (metadata_->schema.empty()) {
                throw ParquetException("Empty file schema (no root)");
            }
            schema_.Init(schema::Unflatten(&metadata_->schema[0],
                                           static_cast<int>(metadata_->schema.size())));
        }

        void InitColumnOrders() {
            // update ColumnOrder
            std::vector<parquet::ColumnOrder> column_orders;
            if (metadata_->__isset.column_orders) {
                column_orders.reserve(metadata_->column_orders.size());
                for (auto column_order: metadata_->column_orders) {
                    if (column_order.__isset.TYPE_ORDER) {
                        column_orders.push_back(ColumnOrder::type_defined_);
                    } else {
                        column_orders.push_back(ColumnOrder::undefined_);
                    }
                }
            } else {
                column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_);
            }

            schema_.updateColumnOrders(column_orders);
        }

        void InitKeyValueMetadata() {
            std::shared_ptr<KeyValueMetadata> metadata = nullptr;
            if (metadata_->__isset.key_value_metadata) {
                metadata = std::make_shared<KeyValueMetadata>();
                for (const auto &it: metadata_->key_value_metadata) {
                    metadata->append(it.key, it.value);
                }
            }
            key_value_metadata_ = std::move(metadata);
        }
    };

    std::shared_ptr<FileMetaData> FileMetaData::Make(
        const void *metadata, uint32_t *metadata_len, const ReaderProperties &properties,
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        // This FileMetaData ctor is private, not compatible with std::make_shared
        return std::shared_ptr<FileMetaData>(
            new FileMetaData(metadata, metadata_len, properties, std::move(file_decryptor)));
    }

    std::shared_ptr<FileMetaData> FileMetaData::Make(
        const void *metadata, uint32_t *metadata_len,
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        return std::shared_ptr<FileMetaData>(new FileMetaData(
            metadata, metadata_len, default_reader_properties(), std::move(file_decryptor)));
    }

    FileMetaData::FileMetaData(const void *metadata, uint32_t *metadata_len,
                               const ReaderProperties &properties,
                               std::shared_ptr<InternalFileDecryptor> file_decryptor)
        : impl_(new FileMetaDataImpl(metadata, metadata_len, properties,
                                     std::move(file_decryptor))) {
    }

    FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {
    }

    FileMetaData::~FileMetaData() = default;

    bool FileMetaData::Equals(const FileMetaData &other) const {
        return impl_->Equals(*other.impl_);
    }

    std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i) const {
        return impl_->RowGroup(i);
    }

    bool FileMetaData::VerifySignature(const void *signature) {
        return impl_->VerifySignature(signature);
    }

    uint32_t FileMetaData::size() const { return impl_->size(); }

    int FileMetaData::num_columns() const { return impl_->num_columns(); }

    int64_t FileMetaData::num_rows() const { return impl_->num_rows(); }

    int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); }

    bool FileMetaData::can_decompress() const {
        int n_row_groups = num_row_groups();
        for (int i = 0; i < n_row_groups; i++) {
            if (!RowGroup(i)->can_decompress()) {
                return false;
            }
        }
        return true;
    }

    bool FileMetaData::is_encryption_algorithm_set() const {
        return impl_->is_encryption_algorithm_set();
    }

    EncryptionAlgorithm FileMetaData::encryption_algorithm() const {
        return impl_->encryption_algorithm();
    }

    const std::string &FileMetaData::footer_signing_key_metadata() const {
        return impl_->footer_signing_key_metadata();
    }

    void FileMetaData::set_file_decryptor(
        std::shared_ptr<InternalFileDecryptor> file_decryptor) {
        impl_->set_file_decryptor(std::move(file_decryptor));
    }

    const std::shared_ptr<InternalFileDecryptor> &FileMetaData::file_decryptor() const {
        return impl_->file_decryptor();
    }

    ParquetVersion::type FileMetaData::version() const {
        switch (impl_->version()) {
            case 1:
                return ParquetVersion::PARQUET_1_0;
            case 2:
                return ParquetVersion::PARQUET_2_LATEST;
            default:
                // Improperly set version, assuming Parquet 1.0
                break;
        }
        return ParquetVersion::PARQUET_1_0;
    }

    const ApplicationVersion &FileMetaData::writer_version() const {
        return impl_->writer_version();
    }

    const std::string &FileMetaData::created_by() const { return impl_->created_by(); }

    int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); }

    const SchemaDescriptor *FileMetaData::schema() const { return impl_->schema(); }

    const std::shared_ptr<const KeyValueMetadata> &FileMetaData::key_value_metadata() const {
        return impl_->key_value_metadata();
    }

    void FileMetaData::set_file_path(const std::string &path) { impl_->set_file_path(path); }

    void FileMetaData::AppendRowGroups(const FileMetaData &other) {
        impl_->AppendRowGroups(other.impl_.get());
    }

    std::shared_ptr<FileMetaData> FileMetaData::Subset(
        const std::vector<int> &row_groups) const {
        return impl_->Subset(row_groups);
    }

    void FileMetaData::WriteTo(::nebula::io::OutputStream *dst,
                               const std::shared_ptr<Encryptor> &encryptor) const {
        return impl_->WriteTo(dst, encryptor);
    }

    class FileCryptoMetaData::FileCryptoMetaDataImpl {
    public:
        FileCryptoMetaDataImpl() = default;

        explicit FileCryptoMetaDataImpl(const uint8_t *metadata, uint32_t *metadata_len,
                                        const ReaderProperties &properties) {
            ThriftDeserializer deserializer(properties);
            deserializer.DeserializeMessage(metadata, metadata_len, &metadata_);
            metadata_len_ = *metadata_len;
        }

        EncryptionAlgorithm encryption_algorithm() const {
            return FromThrift(metadata_.encryption_algorithm);
        }

        const std::string &key_metadata() const { return metadata_.key_metadata; }

        void WriteTo(::nebula::io::OutputStream *dst) const {
            ThriftSerializer serializer;
            serializer.Serialize(&metadata_, dst);
        }

    private:
        friend FileMetaDataBuilder;
        format::FileCryptoMetaData metadata_;
        uint32_t metadata_len_;
    };

    EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() const {
        return impl_->encryption_algorithm();
    }

    const std::string &FileCryptoMetaData::key_metadata() const {
        return impl_->key_metadata();
    }

    std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make(
        const uint8_t *serialized_metadata, uint32_t *metadata_len,
        const ReaderProperties &properties) {
        return std::shared_ptr<FileCryptoMetaData>(
            new FileCryptoMetaData(serialized_metadata, metadata_len, properties));
    }

    FileCryptoMetaData::FileCryptoMetaData(const uint8_t *serialized_metadata,
                                           uint32_t *metadata_len,
                                           const ReaderProperties &properties)
        : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {
    }

    FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {
    }

    FileCryptoMetaData::~FileCryptoMetaData() = default;

    void FileCryptoMetaData::WriteTo(::nebula::io::OutputStream *dst) const {
        impl_->WriteTo(dst);
    }

    std::string FileMetaData::SerializeToString() const {
        // We need to pass in an initial size. Since it will automatically
        // increase the buffer size to hold the metadata, we just leave it 0.
        PARQUET_ASSIGN_OR_THROW(auto serializer, ::nebula::io::BufferOutputStream::create(0));
        WriteTo(serializer.get());
        PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, serializer->finish());
        return metadata_buffer->to_string();
    }

    ApplicationVersion::ApplicationVersion(std::string application, int major, int minor,
                                           int patch)
        : application_(std::move(application)), version{major, minor, patch, "", "", ""} {
    }

    namespace {
        // Parse the application version format and set parsed values to
        // ApplicationVersion.
        //
        // The application version format must be compatible parquet-mr's
        // one. See also:
        //   * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
        //   * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
        //
        // The application version format:
        //   "${APPLICATION_NAME}"
        //   "${APPLICATION_NAME} version ${VERSION}"
        //   "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})"
        //
        // Eg:
        //   parquet-cpp
        //   parquet-cpp version 1.5.0ab-xyz5.5.0+cd
        //   parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd)
        //
        // The VERSION format:
        //   "${MAJOR}"
        //   "${MAJOR}.${MINOR}"
        //   "${MAJOR}.${MINOR}.${PATCH}"
        //   "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}"
        //   "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}"
        //   "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}"
        //   "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}"
        //   "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}"
        //   "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}"
        //   "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}"
        //
        // Eg:
        //   1
        //   1.5
        //   1.5.0
        //   1.5.0ab
        //   1.5.0ab-cdh5.5.0
        //   1.5.0ab-cdh5.5.0+cd
        //   1.5.0ab+cd
        //   1.5.0-cdh5.5.0
        //   1.5.0-cdh5.5.0+cd
        //   1.5.0+cd
        class ApplicationVersionParser {
        public:
            ApplicationVersionParser(const std::string &created_by,
                                     ApplicationVersion &application_version)
                : created_by_(created_by),
                  application_version_(application_version),
                  spaces_(" \t\v\r\n\f"),
                  digits_("0123456789") {
            }

            void Parse() {
                application_version_.application_ = "unknown";
                application_version_.version = {0, 0, 0, "", "", ""};

                if (!ParseApplicationName()) {
                    return;
                }
                if (!ParseVersion()) {
                    return;
                }
                if (!ParseBuildName()) {
                    return;
                }
            }

        private:
            bool IsSpace(const std::string &string, const size_t &offset) {
                auto target = ::std::string_view(string).substr(offset, 1);
                return target.find_first_of(spaces_) != ::std::string_view::npos;
            }

            void RemovePrecedingSpaces(const std::string &string, size_t &start,
                                       const size_t &end) {
                while (start < end && IsSpace(string, start)) {
                    ++start;
                }
            }

            void RemoveTrailingSpaces(const std::string &string, const size_t &start, size_t &end) {
                while (start < (end - 1) && (end - 1) < string.size() && IsSpace(string, end - 1)) {
                    --end;
                }
            }

            bool ParseApplicationName() {
                std::string version_mark(" version ");
                auto version_mark_position = created_by_.find(version_mark);
                size_t application_name_end;
                // No VERSION and BUILD_NAME.
                if (version_mark_position == std::string::npos) {
                    version_start_ = std::string::npos;
                    application_name_end = created_by_.size();
                } else {
                    version_start_ = version_mark_position + version_mark.size();
                    application_name_end = version_mark_position;
                }

                size_t application_name_start = 0;
                RemovePrecedingSpaces(created_by_, application_name_start, application_name_end);
                RemoveTrailingSpaces(created_by_, application_name_start, application_name_end);
                application_version_.application_ = created_by_.substr(
                    application_name_start, application_name_end - application_name_start);

                return true;
            }

            bool ParseVersion() {
                // No VERSION.
                if (version_start_ == std::string::npos) {
                    return false;
                }

                RemovePrecedingSpaces(created_by_, version_start_, created_by_.size());
                version_end_ = created_by_.find(" (", version_start_);
                // No BUILD_NAME.
                if (version_end_ == std::string::npos) {
                    version_end_ = created_by_.size();
                }
                RemoveTrailingSpaces(created_by_, version_start_, version_end_);
                // No VERSION.
                if (version_start_ == version_end_) {
                    return false;
                }
                version_string_ = created_by_.substr(version_start_, version_end_ - version_start_);

                if (!ParseVersionMajor()) {
                    return false;
                }
                if (!ParseVersionMinor()) {
                    return false;
                }
                if (!ParseVersionPatch()) {
                    return false;
                }
                if (!ParseVersionUnknown()) {
                    return false;
                }
                if (!ParseVersionPreRelease()) {
                    return false;
                }
                if (!ParseVersionBuildInfo()) {
                    return false;
                }

                return true;
            }

            bool ParseVersionMajor() {
                size_t version_major_start = 0;
                auto version_major_end = version_string_.find_first_not_of(digits_);
                // MAJOR only.
                if (version_major_end == std::string::npos) {
                    version_major_end = version_string_.size();
                    version_parsing_position_ = version_major_end;
                } else {
                    // No ".".
                    if (version_string_[version_major_end] != '.') {
                        return false;
                    }
                    // No MAJOR.
                    if (version_major_end == version_major_start) {
                        return false;
                    }
                    version_parsing_position_ = version_major_end + 1; // +1 is for '.'.
                }
                auto version_major_string = version_string_.substr(
                    version_major_start, version_major_end - version_major_start);
                application_version_.version.major = atoi(version_major_string.c_str());
                return true;
            }

            bool ParseVersionMinor() {
                auto version_minor_start = version_parsing_position_;
                auto version_minor_end =
                        version_string_.find_first_not_of(digits_, version_minor_start);
                // MAJOR.MINOR only.
                if (version_minor_end == std::string::npos) {
                    version_minor_end = version_string_.size();
                    version_parsing_position_ = version_minor_end;
                } else {
                    // No ".".
                    if (version_string_[version_minor_end] != '.') {
                        return false;
                    }
                    // No MINOR.
                    if (version_minor_end == version_minor_start) {
                        return false;
                    }
                    version_parsing_position_ = version_minor_end + 1; // +1 is for '.'.
                }
                auto version_minor_string = version_string_.substr(
                    version_minor_start, version_minor_end - version_minor_start);
                application_version_.version.minor = atoi(version_minor_string.c_str());
                return true;
            }

            bool ParseVersionPatch() {
                auto version_patch_start = version_parsing_position_;
                auto version_patch_end =
                        version_string_.find_first_not_of(digits_, version_patch_start);
                // No UNKNOWN, PRE_RELEASE and BUILD_INFO.
                if (version_patch_end == std::string::npos) {
                    version_patch_end = version_string_.size();
                }
                // No PATCH.
                if (version_patch_end == version_patch_start) {
                    return false;
                }
                auto version_patch_string = version_string_.substr(
                    version_patch_start, version_patch_end - version_patch_start);
                application_version_.version.patch = atoi(version_patch_string.c_str());
                version_parsing_position_ = version_patch_end;
                return true;
            }

            bool ParseVersionUnknown() {
                // No UNKNOWN.
                if (version_parsing_position_ == version_string_.size()) {
                    return true;
                }
                auto version_unknown_start = version_parsing_position_;
                auto version_unknown_end = version_string_.find_first_of("-+", version_unknown_start);
                // No PRE_RELEASE and BUILD_INFO
                if (version_unknown_end == std::string::npos) {
                    version_unknown_end = version_string_.size();
                }
                application_version_.version.unknown = version_string_.substr(
                    version_unknown_start, version_unknown_end - version_unknown_start);
                version_parsing_position_ = version_unknown_end;
                return true;
            }

            bool ParseVersionPreRelease() {
                // No PRE_RELEASE.
                if (version_parsing_position_ == version_string_.size() ||
                    version_string_[version_parsing_position_] != '-') {
                    return true;
                }

                auto version_pre_release_start = version_parsing_position_ + 1; // +1 is for '-'.
                auto version_pre_release_end =
                        version_string_.find_first_of("+", version_pre_release_start);
                // No BUILD_INFO
                if (version_pre_release_end == std::string::npos) {
                    version_pre_release_end = version_string_.size();
                }
                application_version_.version.pre_release = version_string_.substr(
                    version_pre_release_start, version_pre_release_end - version_pre_release_start);
                version_parsing_position_ = version_pre_release_end;
                return true;
            }

            bool ParseVersionBuildInfo() {
                // No BUILD_INFO.
                if (version_parsing_position_ == version_string_.size() ||
                    version_string_[version_parsing_position_] != '+') {
                    return true;
                }

                auto version_build_info_start = version_parsing_position_ + 1; // +1 is for '+'.
                application_version_.version.build_info =
                        version_string_.substr(version_build_info_start);
                return true;
            }

            bool ParseBuildName() {
                std::string build_mark(" (build ");
                auto build_mark_position = created_by_.find(build_mark, version_end_);
                // No BUILD_NAME.
                if (build_mark_position == std::string::npos) {
                    return false;
                }
                auto build_name_start = build_mark_position + build_mark.size();
                RemovePrecedingSpaces(created_by_, build_name_start, created_by_.size());
                auto build_name_end = created_by_.find_first_of(")", build_name_start);
                // No end ")".
                if (build_name_end == std::string::npos) {
                    return false;
                }
                RemoveTrailingSpaces(created_by_, build_name_start, build_name_end);
                application_version_.build_ =
                        created_by_.substr(build_name_start, build_name_end - build_name_start);

                return true;
            }

            const std::string &created_by_;
            ApplicationVersion &application_version_;

            // For parsing.
            std::string spaces_;
            std::string digits_;
            size_t version_parsing_position_;
            size_t version_start_;
            size_t version_end_;
            std::string version_string_;
        };
    } // namespace

    ApplicationVersion::ApplicationVersion(const std::string &created_by) {
        ApplicationVersionParser parser(created_by, *this);
        parser.Parse();
    }

    bool ApplicationVersion::VersionLt(const ApplicationVersion &other_version) const {
        if (application_ != other_version.application_) return false;

        if (version.major < other_version.version.major) return true;
        if (version.major > other_version.version.major) return false;
        DKCHECK_EQ(version.major, other_version.version.major);
        if (version.minor < other_version.version.minor) return true;
        if (version.minor > other_version.version.minor) return false;
        DKCHECK_EQ(version.minor, other_version.version.minor);
        return version.patch < other_version.version.patch;
    }

    bool ApplicationVersion::VersionEq(const ApplicationVersion &other_version) const {
        return application_ == other_version.application_ &&
               version.major == other_version.version.major &&
               version.minor == other_version.version.minor &&
               version.patch == other_version.version.patch;
    }

    // Reference:
    // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
    // PARQUET-686 has more discussion on statistics
    bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
                                                  EncodedStatistics &statistics,
                                                  SortOrder::type sort_order) const {
        // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed
        // correctly for all types
        if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) ||
            (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) {
            // Only SIGNED are valid unless max and min are the same
            // (in which case the sort order does not matter)
            bool max_equals_min = statistics.has_min && statistics.has_max
                                      ? statistics.min() == statistics.max()
                                      : false;
            if (SortOrder::SIGNED != sort_order && !max_equals_min) {
                return false;
            }

            // Statistics of other types are OK
            if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
                return true;
            }
        }
        // created_by is not populated, which could have been caused by
        // parquet-mr during the same time as PARQUET-251, see PARQUET-297
        if (application_ == "unknown") {
            return true;
        }

        // Unknown sort order has incorrect stats
        if (SortOrder::UNKNOWN == sort_order) {
            return false;
        }

        // PARQUET-251
        if (VersionLt(PARQUET_251_FIXED_VERSION())) {
            return false;
        }

        return true;
    }

    // MetaData Builders
    // row-group metadata
    class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
    public:
        explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
                                                const ColumnDescriptor *column)
            : owned_column_chunk_(new format::ColumnChunk),
              properties_(std::move(props)),
              column_(column) {
            Init(owned_column_chunk_.get());
        }

        explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
                                                const ColumnDescriptor *column,
                                                format::ColumnChunk *column_chunk)
            : properties_(std::move(props)), column_(column) {
            Init(column_chunk);
        }

        const void *contents() const { return column_chunk_; }

        // column chunk
        void set_file_path(const std::string &val) { column_chunk_->__set_file_path(val); }

        // column metadata
        void SetStatistics(const EncodedStatistics &val) {
            column_chunk_->meta_data.__set_statistics(ToThrift(val));
        }

        void Finish(int64_t num_values, int64_t dictionary_page_offset,
                    int64_t index_page_offset, int64_t data_page_offset,
                    int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
                    bool dictionary_fallback,
                    const std::map<Encoding::type, int32_t> &dict_encoding_stats,
                    const std::map<Encoding::type, int32_t> &data_encoding_stats,
                    const std::shared_ptr<Encryptor> &encryptor) {
            if (dictionary_page_offset > 0) {
                column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
                column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
            } else {
                column_chunk_->__set_file_offset(data_page_offset + compressed_size);
            }
            column_chunk_->__isset.meta_data = true;
            column_chunk_->meta_data.__set_num_values(num_values);
            if (index_page_offset >= 0) {
                column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
            }
            column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
            column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
            column_chunk_->meta_data.__set_total_compressed_size(compressed_size);

            std::vector<format::Encoding::type> thrift_encodings;
            std::vector<format::PageEncodingStats> thrift_encoding_stats;
            auto add_encoding = [&thrift_encodings](format::Encoding::type value) {
                auto it = std::find(thrift_encodings.begin(), thrift_encodings.end(), value);
                if (it == thrift_encodings.end()) {
                    thrift_encodings.push_back(value);
                }
            };
            // Add dictionary page encoding stats
            if (has_dictionary) {
                for (const auto &entry: dict_encoding_stats) {
                    format::PageEncodingStats dict_enc_stat;
                    dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE);
                    // Dictionary Encoding would be PLAIN_DICTIONARY in v1 and
                    // PLAIN in v2.
                    format::Encoding::type dict_encoding = ToThrift(entry.first);
                    dict_enc_stat.__set_encoding(dict_encoding);
                    dict_enc_stat.__set_count(entry.second);
                    thrift_encoding_stats.push_back(dict_enc_stat);
                    add_encoding(dict_encoding);
                }
            }
            // Always add encoding for RL/DL.
            // BIT_PACKED is supported in `LevelEncoder`, but would only be used
            // in benchmark and testing.
            // And for now, we always add RLE even if there are no levels at all,
            // while parquet-mr is more fine-grained.
            add_encoding(format::Encoding::RLE);
            // Add data page encoding stats
            for (const auto &entry: data_encoding_stats) {
                format::PageEncodingStats data_enc_stat;
                data_enc_stat.__set_page_type(format::PageType::DATA_PAGE);
                format::Encoding::type data_encoding = ToThrift(entry.first);
                data_enc_stat.__set_encoding(data_encoding);
                data_enc_stat.__set_count(entry.second);
                thrift_encoding_stats.push_back(data_enc_stat);
                add_encoding(data_encoding);
            }
            column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings));
            column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats));

            const auto &encrypt_md =
                    properties_->column_encryption_properties(column_->path()->ToDotString());
            // column is encrypted
            if (encrypt_md != nullptr && encrypt_md->is_encrypted()) {
                column_chunk_->__isset.crypto_metadata = true;
                format::ColumnCryptoMetaData ccmd;
                if (encrypt_md->is_encrypted_with_footer_key()) {
                    // encrypted with footer key
                    ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true;
                    ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey());
                } else {
                    // encrypted with column key
                    format::EncryptionWithColumnKey eck;
                    eck.__set_key_metadata(encrypt_md->key_metadata());
                    eck.__set_path_in_schema(column_->path()->ToDotVector());
                    ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true;
                    ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck);
                }
                column_chunk_->__set_crypto_metadata(std::move(ccmd));

                bool encrypted_footer =
                        properties_->file_encryption_properties()->encrypted_footer();
                bool encrypt_metadata =
                        !encrypted_footer || !encrypt_md->is_encrypted_with_footer_key();
                if (encrypt_metadata) {
                    ThriftSerializer serializer;
                    // Serialize and encrypt ColumnMetadata separately
                    // Thrift-serialize the ColumnMetaData structure,
                    // encrypt it with the column key, and write to encrypted_column_metadata
                    uint8_t *serialized_data;
                    uint32_t serialized_len;

                    serializer.SerializeToBuffer(&column_chunk_->meta_data, &serialized_len,
                                                 &serialized_data);

                    std::vector<uint8_t> encrypted_data(encryptor->CiphertextSizeDelta() +
                                                        serialized_len);
                    unsigned encrypted_len =
                            encryptor->Encrypt(serialized_data, serialized_len, encrypted_data.data());

                    const char *temp =
                            const_cast<const char *>(reinterpret_cast<char *>(encrypted_data.data()));
                    std::string encrypted_column_metadata(temp, encrypted_len);
                    column_chunk_->__set_encrypted_column_metadata(encrypted_column_metadata);

                    if (encrypted_footer) {
                        column_chunk_->__isset.meta_data = false;
                    } else {
                        // Keep redacted metadata version for old readers
                        column_chunk_->__isset.meta_data = true;
                        column_chunk_->meta_data.__isset.statistics = false;
                        column_chunk_->meta_data.__isset.encoding_stats = false;
                    }
                }
            }
        }

        void WriteTo(::nebula::io::OutputStream *sink) {
            ThriftSerializer serializer;
            serializer.Serialize(column_chunk_, sink);
        }

        const ColumnDescriptor *descr() const { return column_; }

        int64_t total_compressed_size() const {
            return column_chunk_->meta_data.total_compressed_size;
        }

    private:
        void Init(format::ColumnChunk *column_chunk) {
            column_chunk_ = column_chunk;

            column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type()));
            column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector());
            column_chunk_->meta_data.__set_codec(
                ToThrift(properties_->compression(column_->path())));
        }

        format::ColumnChunk *column_chunk_;
        std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
        const std::shared_ptr<WriterProperties> properties_;
        const ColumnDescriptor *column_;
    };

    std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
        std::shared_ptr<WriterProperties> props, const ColumnDescriptor *column,
        void *contents) {
        return std::unique_ptr<ColumnChunkMetaDataBuilder>(
            new ColumnChunkMetaDataBuilder(std::move(props), column, contents));
    }

    std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
        std::shared_ptr<WriterProperties> props, const ColumnDescriptor *column) {
        return std::unique_ptr<ColumnChunkMetaDataBuilder>(
            new ColumnChunkMetaDataBuilder(std::move(props), column));
    }

    ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
        std::shared_ptr<WriterProperties> props, const ColumnDescriptor *column)
        : impl_{std::make_unique<ColumnChunkMetaDataBuilderImpl>(std::move(props), column)} {
    }

    ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder(
        std::shared_ptr<WriterProperties> props, const ColumnDescriptor *column,
        void *contents)
        : impl_{
            std::make_unique<ColumnChunkMetaDataBuilderImpl>(
                std::move(props), column, reinterpret_cast<format::ColumnChunk *>(contents))
        } {
    }

    ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default;

    const void *ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); }

    void ColumnChunkMetaDataBuilder::set_file_path(const std::string &path) {
        impl_->set_file_path(path);
    }

    void ColumnChunkMetaDataBuilder::Finish(
        int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset,
        int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size,
        bool has_dictionary, bool dictionary_fallback,
        const std::map<Encoding::type, int32_t> &dict_encoding_stats,
        const std::map<Encoding::type, int32_t> &data_encoding_stats,
        const std::shared_ptr<Encryptor> &encryptor) {
        impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
                      compressed_size, uncompressed_size, has_dictionary, dictionary_fallback,
                      dict_encoding_stats, data_encoding_stats, encryptor);
    }

    void ColumnChunkMetaDataBuilder::WriteTo(::nebula::io::OutputStream *sink) {
        impl_->WriteTo(sink);
    }

    const ColumnDescriptor *ColumnChunkMetaDataBuilder::descr() const {
        return impl_->descr();
    }

    void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics &result) {
        impl_->SetStatistics(result);
    }

    int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
        return impl_->total_compressed_size();
    }

    class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
    public:
        explicit RowGroupMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
                                             const SchemaDescriptor *schema, void *contents)
            : properties_(std::move(props)), schema_(schema), next_column_(0) {
            row_group_ = reinterpret_cast<format::RowGroup *>(contents);
            InitializeColumns(schema->num_columns());
        }

        ColumnChunkMetaDataBuilder *NextColumnChunk() {
            if (!(next_column_ < num_columns())) {
                std::stringstream ss;
                ss << "The schema only has " << num_columns()
                        << " columns, requested metadata for column: " << next_column_;
                throw ParquetException(ss.str());
            }
            auto column = schema_->Column(next_column_);
            auto column_builder = ColumnChunkMetaDataBuilder::Make(
                properties_, column, &row_group_->columns[next_column_++]);
            auto column_builder_ptr = column_builder.get();
            column_builders_.push_back(std::move(column_builder));
            return column_builder_ptr;
        }

        int current_column() { return next_column_ - 1; }

        void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) {
            if (!(next_column_ == schema_->num_columns())) {
                std::stringstream ss;
                ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns()
                        << " columns are initialized";
                throw ParquetException(ss.str());
            }

            int64_t file_offset = 0;
            int64_t total_compressed_size = 0;
            for (int i = 0; i < schema_->num_columns(); i++) {
                if (!(row_group_->columns[i].file_offset >= 0)) {
                    std::stringstream ss;
                    ss << "Column " << i << " is not complete.";
                    throw ParquetException(ss.str());
                }
                if (i == 0) {
                    const format::ColumnMetaData &first_col = row_group_->columns[0].meta_data;
                    // As per spec, file_offset for the row group points to the first
                    // dictionary or data page of the column.
                    if (first_col.__isset.dictionary_page_offset &&
                        first_col.dictionary_page_offset > 0) {
                        file_offset = first_col.dictionary_page_offset;
                    } else {
                        file_offset = first_col.data_page_offset;
                    }
                }
                // sometimes column metadata is encrypted and not available to read,
                // so we must get total_compressed_size from column builder
                total_compressed_size += column_builders_[i]->total_compressed_size();
            }

            const auto &sorting_columns = properties_->sorting_columns();
            if (!sorting_columns.empty()) {
                std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
                for (size_t i = 0; i < sorting_columns.size(); ++i) {
                    thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
                }
                row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
            }

            row_group_->__set_file_offset(file_offset);
            row_group_->__set_total_compressed_size(total_compressed_size);
            row_group_->__set_total_byte_size(total_bytes_written);
            row_group_->__set_ordinal(row_group_ordinal);
        }

        void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }

        int num_columns() { return static_cast<int>(row_group_->columns.size()); }

        int64_t num_rows() { return row_group_->num_rows; }

    private:
        void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }

        format::RowGroup *row_group_;
        const std::shared_ptr<WriterProperties> properties_;
        const SchemaDescriptor *schema_;
        std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder> > column_builders_;
        int next_column_;
    };

    std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
        std::shared_ptr<WriterProperties> props, const SchemaDescriptor *schema_,
        void *contents) {
        return std::unique_ptr<RowGroupMetaDataBuilder>(
            new RowGroupMetaDataBuilder(std::move(props), schema_, contents));
    }

    RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr<WriterProperties> props,
                                                     const SchemaDescriptor *schema_,
                                                     void *contents)
        : impl_{new RowGroupMetaDataBuilderImpl(std::move(props), schema_, contents)} {
    }

    RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default;

    ColumnChunkMetaDataBuilder *RowGroupMetaDataBuilder::NextColumnChunk() {
        return impl_->NextColumnChunk();
    }

    int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); }

    int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); }

    int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); }

    void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) {
        impl_->set_num_rows(num_rows);
    }

    void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
                                         int16_t row_group_ordinal) {
        impl_->Finish(total_bytes_written, row_group_ordinal);
    }

    // file metadata
    class FileMetaDataBuilder::FileMetaDataBuilderImpl {
    public:
        explicit FileMetaDataBuilderImpl(
            const SchemaDescriptor *schema, std::shared_ptr<WriterProperties> props,
            std::shared_ptr<const KeyValueMetadata> key_value_metadata)
            : metadata_(new format::FileMetaData()),
              properties_(std::move(props)),
              schema_(schema),
              key_value_metadata_(std::move(key_value_metadata)) {
            if (properties_->file_encryption_properties() != nullptr &&
                properties_->file_encryption_properties()->encrypted_footer()) {
                crypto_metadata_ = std::make_unique<format::FileCryptoMetaData>();
            }
        }

        RowGroupMetaDataBuilder *AppendRowGroup() {
            row_groups_.emplace_back();
            current_row_group_builder_ =
                    RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back());
            return current_row_group_builder_.get();
        }

        void SetPageIndexLocation(const PageIndexLocation &location) {
            auto set_index_location =
                    [this](size_t row_group_ordinal,
                           const PageIndexLocation::FileIndexLocation &file_index_location,
                           bool column_index) {
                auto &row_group_metadata = this->row_groups_.at(row_group_ordinal);
                auto iter = file_index_location.find(row_group_ordinal);
                if (iter != file_index_location.cend()) {
                    const auto &row_group_index_location = iter->second;
                    for (size_t i = 0; i < row_group_index_location.size(); ++i) {
                        if (i >= row_group_metadata.columns.size()) {
                            throw ParquetException("Cannot find metadata for column ordinal ", i);
                        }
                        auto &column_metadata = row_group_metadata.columns.at(i);
                        const auto &index_location = row_group_index_location.at(i);
                        if (index_location.has_value()) {
                            if (column_index) {
                                column_metadata.__set_column_index_offset(index_location->offset);
                                column_metadata.__set_column_index_length(index_location->length);
                            } else {
                                column_metadata.__set_offset_index_offset(index_location->offset);
                                column_metadata.__set_offset_index_length(index_location->length);
                            }
                        }
                    }
                }
            };

            for (size_t i = 0; i < row_groups_.size(); ++i) {
                set_index_location(i, location.column_index_location, true);
                set_index_location(i, location.offset_index_location, false);
            }
        }

        std::unique_ptr<FileMetaData> Finish(
            const std::shared_ptr<const KeyValueMetadata> &key_value_metadata) {
            int64_t total_rows = 0;
            for (const auto &row_group: row_groups_) {
                total_rows += row_group.num_rows;
            }
            metadata_->__set_num_rows(total_rows);
            metadata_->__set_row_groups(row_groups_);

            if (key_value_metadata_ || key_value_metadata) {
                if (!key_value_metadata_) {
                    key_value_metadata_ = key_value_metadata;
                } else if (key_value_metadata) {
                    key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
                }
                metadata_->key_value_metadata.clear();
                metadata_->key_value_metadata.reserve(
                    static_cast<size_t>(key_value_metadata_->size()));
                for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
                    format::KeyValue kv_pair;
                    kv_pair.__set_key(key_value_metadata_->key(i));
                    kv_pair.__set_value(key_value_metadata_->value(i));
                    metadata_->key_value_metadata.push_back(std::move(kv_pair));
                }
                metadata_->__isset.key_value_metadata = true;
            }

            int32_t file_version = 0;
            switch (properties_->version()) {
                case ParquetVersion::PARQUET_1_0:
                    file_version = 1;
                    break;
                default:
                    file_version = 2;
                    break;
            }
            metadata_->__set_version(file_version);
            metadata_->__set_created_by(properties_->created_by());

            // Users cannot set the `ColumnOrder` since we do not have user defined sort order
            // in the spec yet.
            // We always default to `TYPE_DEFINED_ORDER`. We can expose it in
            // the API once we have user defined sort orders in the Parquet format.
            // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType
            format::TypeDefinedOrder type_defined_order;
            format::ColumnOrder column_order;
            column_order.__set_TYPE_ORDER(type_defined_order);
            column_order.__isset.TYPE_ORDER = true;
            metadata_->column_orders.resize(schema_->num_columns(), column_order);
            metadata_->__isset.column_orders = true;

            // if plaintext footer, set footer signing algorithm
            auto file_encryption_properties = properties_->file_encryption_properties();
            if (file_encryption_properties && !file_encryption_properties->encrypted_footer()) {
                EncryptionAlgorithm signing_algorithm;
                EncryptionAlgorithm algo = file_encryption_properties->algorithm();
                signing_algorithm.aad.aad_file_unique = algo.aad.aad_file_unique;
                signing_algorithm.aad.supply_aad_prefix = algo.aad.supply_aad_prefix;
                if (!algo.aad.supply_aad_prefix) {
                    signing_algorithm.aad.aad_prefix = algo.aad.aad_prefix;
                }
                signing_algorithm.algorithm = ParquetCipher::AES_GCM_V1;

                metadata_->__set_encryption_algorithm(ToThrift(signing_algorithm));
                const std::string &footer_signing_key_metadata =
                        file_encryption_properties->footer_key_metadata();
                if (footer_signing_key_metadata.size() > 0) {
                    metadata_->__set_footer_signing_key_metadata(footer_signing_key_metadata);
                }
            }

            ToParquet(static_cast<parquet::schema::GroupNode *>(schema_->schema_root().get()),
                      &metadata_->schema);
            auto file_meta_data = std::unique_ptr<FileMetaData>(new FileMetaData());
            file_meta_data->impl_->metadata_ = std::move(metadata_);
            file_meta_data->impl_->InitSchema();
            file_meta_data->impl_->InitKeyValueMetadata();
            return file_meta_data;
        }

        std::unique_ptr<FileCryptoMetaData> BuildFileCryptoMetaData() {
            if (crypto_metadata_ == nullptr) {
                return nullptr;
            }

            auto file_encryption_properties = properties_->file_encryption_properties();

            crypto_metadata_->__set_encryption_algorithm(
                ToThrift(file_encryption_properties->algorithm()));
            std::string key_metadata = file_encryption_properties->footer_key_metadata();

            if (!key_metadata.empty()) {
                crypto_metadata_->__set_key_metadata(key_metadata);
            }

            std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData());
            file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_);
            return file_crypto_metadata;
        }

    protected:
        std::unique_ptr<format::FileMetaData> metadata_;
        std::unique_ptr<format::FileCryptoMetaData> crypto_metadata_;

    private:
        const std::shared_ptr<WriterProperties> properties_;
        std::vector<format::RowGroup> row_groups_;

        std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
        const SchemaDescriptor *schema_;
        std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
    };

    std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
        const SchemaDescriptor *schema, std::shared_ptr<WriterProperties> props,
        std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
        return std::unique_ptr<FileMetaDataBuilder>(
            new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
    }

    std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
        const SchemaDescriptor *schema, std::shared_ptr<WriterProperties> props) {
        return std::unique_ptr<FileMetaDataBuilder>(
            new FileMetaDataBuilder(schema, std::move(props)));
    }

    FileMetaDataBuilder::FileMetaDataBuilder(
        const SchemaDescriptor *schema, std::shared_ptr<WriterProperties> props,
        std::shared_ptr<const KeyValueMetadata> key_value_metadata)
        : impl_{
            std::make_unique<FileMetaDataBuilderImpl>(schema, std::move(props),
                                                      std::move(key_value_metadata))
        } {
    }

    FileMetaDataBuilder::~FileMetaDataBuilder() = default;

    RowGroupMetaDataBuilder *FileMetaDataBuilder::AppendRowGroup() {
        return impl_->AppendRowGroup();
    }

    void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation &location) {
        impl_->SetPageIndexLocation(location);
    }

    std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
        const std::shared_ptr<const KeyValueMetadata> &key_value_metadata) {
        return impl_->Finish(key_value_metadata);
    }

    std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
        return impl_->BuildFileCryptoMetaData();
    }
} // namespace parquet
